Aller au contenu principal

Analytics Function Types

Sources

fs-source

This type reads a file on a S3 bucket and outputs its content as a DataFrame.

- id: read
type: fs-source
addr: 192.168.100.237:31318
topics: s3a://testcsv/mycsv.csv
user: minioadmin
password: password
asTable: mytable # (6)!
format: csv # (1)!
schema: |
a INT,
b INT,
c INT,
d STRING
meta:
delimiter: "," # (5)!
encoding: utf-8 # (4)!
csvHeader: true # (3)!
ignore-parse-errors: false
includeFileMetadata: false # (2)!
out:
- process
  1. The format attribute is used to specify the format of files. Allowed formats are :
    • json
    • csv
    • parquet
    • delta
    • raw
    • text
    • big
  2. Can optionnaly be added with a true value (false by default) to include file metadata in the output DataFrame.
  3. When reading csv, KFlow considers by default that the file has headers. If the file does not have headers, set this attribute to false.
  4. The encoding attribute is used to specify the encoding of the csv file. Default is utf-8. This value is optional, cannot currently be changed and is shown here for completeness.
  5. The delimiter attribute is used to specify the delimiter of the csv file. Default is ,.
  6. If asTable is set, the DataFrame is registered as a temporary table in the Spark session. This allows subsquent Tasks to do SQL queries on the DataFrame.

csv format

See above example. If the schema is not given, it is inferred from the file.

json format

Reads a JSON Lines file given in topics. The input is read twice, first to infer the schema, then to read the data.

parquet format

Loads a parquet file. The schema is inferred from the file.

delta format

Loads a Delta Lake file. The schema is inferred from the file.

raw format

Loads a binary file. The output schema is the following :

  • path: string
  • modificationTime: string
  • length: long
  • content: binary content of the file

text format

Reads a text file. Each line is a record with a single column value. If the option delimiter:eof is given, then the whole file is read as a single record.

- id: read
type: fs-source
format: text
addr: 192.168.100.237:31318
topics: s3a://test/mytext.text
user: minioadmin
password: password
format: text
schema: |
value STRING
meta:
delimiter: eof
out:
- process

big format

- id: read
type: fs-source
format: big
addr: 192.168.100.237:31318
topics: s3a://test/subdir
user: minioadmin
password: password
schema: |
local_path STRING
meta: # (1)!
ignoreCorruptFiles: false
ignoreMissingFiles: false
pathGlobFilter: "*.png"
recursiveFileLookup: true
out:
- process
  1. The following options may be used

The topics field point to a list of files which are all downloaded locally. The result is a Data Frame with a local_path column containing the path to the downloaded file.

sim-source

Generates data with the following schema:

abc
000
112
.........
999999

The number of generated lines is 100 by default and it can be adjusted with size meta parameter :

- id: read
type: sim-source
schema: | # (1)!
a INT,
b INT,
c INT
meta:
size: 1000 # (2)!
out:
- process
  1. The schema is optional as it is known and constant.
  2. Generate 1000 lines of data.

jdbc-source, sql-source, postgresql-source

All these types read data from a PostgreSQL table in a database. For clarity, use the type postgresql-source.

In the following example, adapt host, port, dbname, user, password and table to your configuration. The schema is inferred from the table (or the subquery, if used).

- id: read
type: postgresql-source
addr: jdbc:postgresql://host:port/dbname
topics: table # (1)!
meta:
user: user
password: password
  1. According to PySpark documentation, the table name can be anything which is valid in a FROM clause of a SQL query.

Processors

query

This type is used to execute a SQL query on a DataFrame. The query is executed on the DataFrame given as input and the result is the output.

- id: process
type: query
sql: |
SELECT * FROM mytable WHERE a > 10
output:
- print

map

This type is used to apply a function to a DataFrame. The function is given inline or as a call to a function in a module (see UDF). The function should take an iterator of DataFrames as input and return an iterator of DataFrames as output.

Here is an example with an inline function.

- id: process
type: map
name: process # (1)!
schema: | # (2)!
a INT,
b INT,
c INT
asTable: mytable # (3)!
fn: |
def process(df_iterator):
for df in df_iterator:
df['c'] = df['a'] + df['b']
yield df[['a', 'b', 'c']]
output:
- print
  1. name of the python function to call
  2. output schema
  3. if asTable is set, the DataFrame is registered as a temporary table in the Spark session. This allows to use SQL queries on the DataFrame.

Sinks

print

Prints the DataFrame to the console.

Prints out on the console the schema of the input in the tree format.

fs-sink

Writes the DataFrame to one (or several) file(s) on a S3 bucket.

- id: write
type: fs-sink
coalesce: 1 # (3)!
format: json | csv | parquet | delta
partitionBy: col # (2)!
topics: s3a://testcsv/mycsv.csv
addr: 192.168.100.237:31318
user: minioadmin
password: password
meta:
# option for all formats
writeMode: overwrite # (1)!
# options for delta format
maxRecordsPerFile: 1000
# options for csv format
csvHeader: true
delimiter: ","

  1. The writeMode attribute is used to specify the mode of writing. overwrite is the default mode. Allowed values are :
    • overwrite
    • append
    • ignore (ignore this operation if data exists)
    • error (exception if data already exists)
  2. The output is partitioned by the given column name. If specified, the output is laid out on the bucket similar to Hive’s partitioning scheme. This parameter is optional and not available for delta format
  3. Coalesce partition in this number of partitions. This attribute is optional, and no coalescence will be done if not provided

jdbc-sink, sql-sink, postgresql-sink

All these types write data to a PostgreSQL table in a database. For clarity, use the type postgresql-sink.

In the following example, adapt host, port, dbname, user, password and table to your configuration.

- id: write
type: postgresql-sink
addr: jdbc:postgresql://host:port/dbname
topics: table # (2)!
meta:
writeMode: append # (1)!
user: user
password: password
  1. The writeMode attribute is used to specify the mode of writing. append is the default mode. Allowed values are :
    • overwrite
    • append
    • ignore (ignore this operation if data exists)
    • error (exception if data already exists)
  2. The table should exist in the database. If it does not, it is created with the schema of the DataFrame

kafka-sink

Serialize each record of the DataFrame and outputs it to a Kafka topic.

- id: write
type: kafka-sink
topics: topic1
addr: 192.168.2.35:9092,192.168.2.37:9092 # (1)!
  1. The addr attribute is used to specify the Kafka bootstrap servers. It is a comma separated list of host:port.

janusgraph-sink

This type allows to implement a custom sink by forking the runtime. It is a no op in the current version of KFlow.